from airflow.models.dag import DAG
from airflow.utils.dates import days_ago
import datetime
import pendulum
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from airflow.operators.bash_operator import BashOperator
# from datetime import datetime, timedelta

local_tz = pendulum.timezone("Asia/Shanghai")
from dagen.operators import SSHOperator


schedule_interval="0 2 * * *"

dag = DAG(
**{
'dag_id': "child_dag",
'catchup': False,
'start_date': datetime.datetime(2021, month=10, day=20, tzinfo=local_tz),
'schedule_interval': schedule_interval,
'default_args':
    {
        'owner': 'Avris',
        'depends_on_past': False,
        'retries': 2,
        'email': ['guanghu@tesla.com'],
        'email_on_failure': True,
        'email_on_retry': False,
#         'retry_delay': timedelta(seconds=30)
    }
}
)

task1 = ExternalTaskSensor(
    task_id='child_task1',  # waiting for the whole dag to execute
    execution_delta=None,  # Same day as today
    external_dag_id='dws_cdp_agg_smp_sm_activity_td_v2_generate',  # here is the id of the dag
    external_task_id='par_task1',  # waiting for the whole dag to execute
    dag=dag,
    timeout=60,
#     execution_delta=datetime.timedelta(minutes=5),
    allowed_states=['success'],
    failed_states=['failed', 'skipped'],
    check_existence=True
)

ssh_hook_task = BashOperator(
task_id='ssh_hook_task',
bash_command='echo hello',
dag=dag
)



task1>>ssh_hook_task
